package com.lyon.demo.netty.server;

import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.ClassUtil;
import cn.hutool.core.util.ReflectUtil;
import cn.hutool.core.util.SerializeUtil;
import cn.hutool.core.util.StrUtil;
import com.lyon.demo.common.spi.annotation.LyonSpi;
import com.lyon.demo.common.spi.annotation.Signleton;
import com.lyon.demo.netty.core.bytebuf.CommandDecoder;
import com.lyon.demo.netty.core.bytebuf.CommandEncoder;
import com.lyon.demo.netty.server.exception.ServiceNotFoundException;
import com.lyon.demo.rpc.api.core.*;
import com.lyon.demo.rpc.api.endpoint.TransportServer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * @author Lyon
 */
@Slf4j
@LyonSpi(value = CommonProtocol.NETTY)
@Signleton
public class NettyTransportServer implements TransportServer {

    private ServerBootstrap serverBootstrap;
    private RpcServiceHandlerRegistry rpcServiceHandlerRegistry;
    private int port;

    @Override
    public SocketAddress start(RpcServiceHandlerRegistry rpcServiceHandlerRegistry, int port) throws Exception {
        this.rpcServiceHandlerRegistry = rpcServiceHandlerRegistry;
        this.port = port;
        createServerBootstrap();
        return new InetSocketAddress("localhost",port);
    }

    private void createServerBootstrap() {
        if (serverBootstrap!=null){
            return;
        }
        synchronized (NettyTransportServer.class) {
            if (serverBootstrap!=null){
                return;
            }
            doCreateServerBootstrap();
        }

    }

    @SneakyThrows
    private void doCreateServerBootstrap() {
        ServerBootstrap backup = new ServerBootstrap();
        final NioEventLoopGroup acceptGroup = new NioEventLoopGroup(2);
        final NioEventLoopGroup workerGroup = new NioEventLoopGroup(5);
        backup.group(acceptGroup,workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel ch) throws Exception {
                        ch.pipeline()
                                .addLast(new CommandDecoder())
                                .addLast(new RpcServerChannelHandler())
                                .addLast(new CommandEncoder())
                                ;
                    }
                });
        final ChannelFuture channelFuture = backup.bind(port);
        if (!channelFuture.await(3000, TimeUnit.MICROSECONDS)) {
            throw new TimeoutException(StrUtil.format("rpc-netty-server [{}] start timeout ..",port));
        }
        log.info("rpc-netty- starter ..");
    }

    @Override
    public void stop() {

    }

    class RpcServerChannelHandler extends SimpleChannelInboundHandler<Command> {

        @Override
        protected void messageReceived(ChannelHandlerContext ctx, Command command) throws Exception {
            RpcRequest rpcRequest = SerializeUtil.deserialize(command.getPayload());
            final Object[] args = SerializeUtil.deserialize(rpcRequest.getArgs());
            final Object service = rpcServiceHandlerRegistry.getService(rpcRequest.getInterfaceName());
            Assert.notNull(service,() -> new ServiceNotFoundException(rpcRequest.getInterfaceName()));
            try {
                final Class<Object> clazz = ClassUtil.loadClass(rpcRequest.getInterfaceName());
//                Object result = method.invoke(service,"2131");
//                final Method methodOfObj = ReflectUtil.getMethodOfObj(service, rpcRequest.getMethodName(), args);
                Object result = ReflectUtil.invoke(service, rpcRequest.getMethodName(),args);
                final Command responseCommand = command.clone();
                responseCommand.setPayload(SerializeUtil.serialize(Result.success(result)));
                ctx.channel().writeAndFlush(responseCommand);
            } catch (Exception ex) {
                log.error("execute rpc method ERROR .." ,ex);
            }
        }
    }

}
